Skip to main content

LakeSoul Automatic Asynchronous Data Cleanup Service

tip

This feature is available in version 3.0.0 and above.

In data warehouses, it is often necessary to define the lifecycle of table data in order to save storage space and reduce costs.

On the other hand, for real-time updating tables, redundant data may exist. Redundant data refers to the fact that every time a compaction operation is executed, a new compaction file is generated. The new compaction file contains all historical data, and at this point, all previous compaction files can be considered redundant.

Meanwhile, for a table that is continuously updated and compacted, if the user only cares about data changes within a recent time range, they can choose to clean up all data before a specific compaction. This way, one full snapshot of the data will be preserved, while still allowing users to perform incremental reads and snapshot queries from the recent time range onward.

Before version 3.0.0, the automatic cleanup service was scheduled to run daily, scanning all metadata to find expired files and then deleting them. This caused a high instantaneous load on the metadata service.
Starting from version 3.0.0, the cleanup service has been completely reimplemented as an asynchronous, real-time cleanup mechanism. By consuming metadata change events through CDC combined with Flink’s timer mechanism, the new design achieves asynchronous cleanup with higher efficiency and significantly reduces metadata service load.

./bin/flink run \
-c org.apache.flink.lakesoul.entry.clean.NewCleanJob \
lakesoul-flink-1.20-3.0.0-SNAPSHOT.jar \
--source_db.dbName lakesoul_test \
--source_db.user lakesoul_test \
--source_db.host localhost \
--source_db.port 5432 \
--source_db.password lakesoul_test \
--slotName flink \
--plugName pgoutput \
--url jdbc:postgresql://localhost:5432/lakesoul_test \
--ontimer_interval "1" \
--dataExpiredTime "5"

Parameter Configuration

Parameter NameRequiredDescription
--source_db.dbNameyesPostgreSQL database name
--source_db.useryesPostgreSQL username
--source_db.hostyesPostgreSQL host
--source_db.portyesDatabase port
--source_db.passwordyesDatabase password
--slotNameyesLogical replication slot name
--plugNameyesLogical replication plugin name
--urlyesJDBC URL of the PostgreSQL database
--ontimer_intervalnoTimer trigger interval (in minutes), default is 5 minutes
--dataExpiredTimenoData expiration time (in days), default is 3 days
--source.parallelismnoSource reading parallelism, default is 1
--targetTableNamenoSpecify a table for cleanup, e.g., public.testTable. Multiple tables can be separated by commas. If not specified, all tables will be included in cleanup.